{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "fb10790c",
   "metadata": {},
   "source": [
    "# Data Flows with Kafka\n",
    "\n",
    "Components:\n",
    "- Kafka for persistence and information exchange.\n",
    "- Proxy scheduler for ease of testing.\n",
    "- Kotlin transformer engine to wrap Kafka Streams\n",
    "\n",
    "## Prerequisites\n",
    "\n",
    "- Docker\n",
    "- JRE (and JDK for development)\n",
    "- `kafkacat`/`kcat` or other CLI tool for interacting with Kafka\n",
    "    - https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html\n",
    "- _(optional)_ `grpcurl` for checking interaction with Kotlin engine"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0f3b780d",
   "metadata": {},
   "source": [
    "## Setup\n",
    "\n",
    "You will need to run Kafka and a (proxy) scheduler before the Kotlin engine can start up successfully.\n",
    "\n",
    "```bash\n",
    "# .../scheduler\n",
    "make start-kafka-host\n",
    "make start-proxy-local\n",
    "```\n",
    "\n",
    "```bash\n",
    "# .../data-flow\n",
    "./gradlew build\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1223d32e",
   "metadata": {},
   "source": [
    "The input topic for a particular step for the data-flow engine is the _output_ of a model/component, therefore contains _responses_.\n",
    "\n",
    "Conversely, the data-flow engine should be writing _requests_ to the _input_ topic for another model/compoment, but this is the _output_ of that step.\n",
    "\n",
    "So, below we denote a step by number, e.g. 1 or 2, and talk about the inputs and outputs for that step for variable names.\n",
    "The actual topic names are the other way around."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "c6c540a1",
   "metadata": {},
   "outputs": [],
   "source": [
    "inputTopic1 = 'seldon.some-namespace.some-model-1.outputs'\n",
    "outputTopic1 = 'seldon.some-namespace.some-model-2.inputs'\n",
    "\n",
    "inputTopic2 = 'seldon.some-namespace.some-model-3.outputs'\n",
    "outputTopic2 = 'seldon.some-namespace.some-model-4.inputs'"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2937ce1c",
   "metadata": {},
   "source": [
    "## Check there are no existing topics (no cheating)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "78f4b6b8",
   "metadata": {},
   "outputs": [],
   "source": [
    "!kafkacat -b localhost:9092 -L | grep -i seldon -A1"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8076a88f",
   "metadata": {},
   "source": [
    "## Write to Kafka topic"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "e6e5623d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "make: Entering directory '/home/agr/source/agrski/seldon-core-v2/scheduler'\n",
      "go build -o data-flow/scripts/bin/producer ./data-flow/scripts/producer.go ./data-flow/scripts/common.go\n",
      "make: Leaving directory '/home/agr/source/agrski/seldon-core-v2/scheduler'\n"
     ]
    }
   ],
   "source": [
    "!make -C .. build-dataflow-producer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "493e08bc",
   "metadata": {},
   "outputs": [],
   "source": [
    "!../data-flow/scripts/bin/producer --input-topics $inputTopic1,$inputTopic2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "7bd5614f",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "  topic \"seldon.some-namespace.some-model-1.outputs\" with 1 partitions:\r\n",
      "    partition 0, leader 1, replicas: 1, isrs: 1\r\n",
      "  topic \"seldon.some-namespace.some-model-3.outputs\" with 1 partitions:\r\n",
      "    partition 0, leader 1, replicas: 1, isrs: 1\r\n"
     ]
    }
   ],
   "source": [
    "!kafkacat -b localhost:9092 -L | grep -i seldon -A1"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4bdffb42",
   "metadata": {},
   "source": [
    "## Consume from Kafka topic\n",
    "\n",
    "Half the messages here should be ignored by the data-flow engine due to having headers that do not match the pipeline name, i.e. 10 messages here -> 5 messages on output topics."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "1557fe1a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Offset 0\tlength 77\tcontents 168455535\n",
      "Offset 1\tlength 77\tcontents 168455535\n",
      "Offset 2\tlength 77\tcontents 168455535\n",
      "Offset 3\tlength 77\tcontents 168455535\n",
      "Offset 4\tlength 77\tcontents 168455535\n",
      "Offset 5\tlength 77\tcontents 168455535\n",
      "Offset 6\tlength 77\tcontents 168455535\n",
      "Offset 7\tlength 77\tcontents 168455535\n",
      "Offset 8\tlength 77\tcontents 168455535\n",
      "Offset 9\tlength 77\tcontents 168455535\n",
      "% Reached end of topic seldon.some-namespace.some-model-1.outputs [0] at offset 10: exiting\n"
     ]
    }
   ],
   "source": [
    "!kafkacat -b localhost:9092 -t $inputTopic1 -C -o beginning -e -f 'Offset %o\\tlength %S\\tcontents %s\\n' -s value=i"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "41fcf7eb",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Offset 0\tlength 77\tcontents 168455535\n",
      "Offset 1\tlength 77\tcontents 168455535\n",
      "Offset 2\tlength 77\tcontents 168455535\n",
      "Offset 3\tlength 77\tcontents 168455535\n",
      "Offset 4\tlength 77\tcontents 168455535\n",
      "Offset 5\tlength 77\tcontents 168455535\n",
      "Offset 6\tlength 77\tcontents 168455535\n",
      "Offset 7\tlength 77\tcontents 168455535\n",
      "Offset 8\tlength 77\tcontents 168455535\n",
      "Offset 9\tlength 77\tcontents 168455535\n",
      "% Reached end of topic seldon.some-namespace.some-model-3.outputs [0] at offset 10: exiting\n"
     ]
    }
   ],
   "source": [
    "!kafkacat -b localhost:9092 -t $inputTopic2 -C -o beginning -e -f 'Offset %o\\tlength %S\\tcontents %s\\n' -s value=i"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d62de62b",
   "metadata": {},
   "source": [
    "## Transform data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6b8c1511",
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "! pushd ../data-flow/ \\\n",
    "    && ./gradlew run \\\n",
    "        --no-daemon \\\n",
    "        --args=\"--kafka-bootstrap-servers=localhost:9092 --upstream-host=localhost --upstream-port=10101 --num-cores=2\" \\\n",
    "    && popd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "7af54bc8",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "  topic \"seldon.some-namespace.some-model-1.outputs\" with 1 partitions:\r\n",
      "    partition 0, leader 1, replicas: 1, isrs: 1\r\n",
      "  topic \"seldon.some-namespace.some-model-2.inputs\" with 1 partitions:\r\n",
      "    partition 0, leader 1, replicas: 1, isrs: 1\r\n",
      "  topic \"seldon.some-namespace.some-model-3.outputs\" with 1 partitions:\r\n",
      "    partition 0, leader 1, replicas: 1, isrs: 1\r\n",
      "  topic \"seldon.some-namespace.some-model-4.inputs\" with 1 partitions:\r\n",
      "    partition 0, leader 1, replicas: 1, isrs: 1\r\n"
     ]
    }
   ],
   "source": [
    "!kafkacat -b localhost:9092 -L | grep -i seldon -A1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "744c938a",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Offset 0\tlength 62\tcontents 436483123\n",
      "Offset 1\tlength 62\tcontents 436483123\n",
      "Offset 2\tlength 62\tcontents 436483123\n",
      "Offset 3\tlength 62\tcontents 436483123\n",
      "Offset 4\tlength 62\tcontents 436483123\n",
      "% Reached end of topic seldon.some-namespace.some-model-2.inputs [0] at offset 5: exiting\n"
     ]
    }
   ],
   "source": [
    "!kafkacat -b localhost:9092 -t $outputTopic1 -C -o beginning -e -f 'Offset %o\\tlength %S\\tcontents %s\\n' -s value=i"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "b9a2becc",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Offset 0\tlength 34\tcontents 436483123\n",
      "Offset 1\tlength 34\tcontents 436483123\n",
      "Offset 2\tlength 34\tcontents 436483123\n",
      "Offset 3\tlength 34\tcontents 436483123\n",
      "Offset 4\tlength 34\tcontents 436483123\n",
      "% Reached end of topic seldon.some-namespace.some-model-4.inputs [0] at offset 5: exiting\n"
     ]
    }
   ],
   "source": [
    "!kafkacat -b localhost:9092 -t $outputTopic2 -C -o beginning -e -f 'Offset %o\\tlength %S\\tcontents %s\\n' -s value=i"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "301356f2",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "make: Entering directory '/home/agr/source/agrski/seldon-core-v2/scheduler'\n",
      "go build -o data-flow/scripts/bin/consumer ./data-flow/scripts/consumer.go ./data-flow/scripts/common.go\n",
      "make: Leaving directory '/home/agr/source/agrski/seldon-core-v2/scheduler'\n"
     ]
    }
   ],
   "source": [
    "!make -C .. build-dataflow-consumer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "afa6e0f2",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-4.inputs: id:\"4312\" inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:0 int_contents:0}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-4.inputs: id:\"4312\" inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:1 int_contents:1}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-4.inputs: id:\"4312\" inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:2 int_contents:2}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-4.inputs: id:\"4312\" inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:3 int_contents:3}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-4.inputs: id:\"4312\" inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:4 int_contents:4}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-2.inputs: id:\"4312\" inputs:{name:\"tensor0\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:0 int_contents:0}} inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:0 int_contents:0}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-2.inputs: id:\"4312\" inputs:{name:\"tensor0\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:1 int_contents:1}} inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:1 int_contents:1}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-2.inputs: id:\"4312\" inputs:{name:\"tensor0\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:2 int_contents:2}} inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:2 int_contents:2}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-2.inputs: id:\"4312\" inputs:{name:\"tensor0\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:3 int_contents:3}} inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:3 int_contents:3}} \r\n",
      "\u001b[36mINFO\u001b[0m[0003] received inference response on topic seldon.some-namespace.some-model-2.inputs: id:\"4312\" inputs:{name:\"tensor0\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:4 int_contents:4}} inputs:{name:\"tensor1\" datatype:\"INT32\" shape:1 shape:2 contents:{int_contents:4 int_contents:4}} \r\n"
     ]
    }
   ],
   "source": [
    "!../data-flow/scripts/bin/consumer --output-topics $outputTopic1,$outputTopic2"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
